eigen_client_eth/
instrumented_client.rs

1use crate::client::BackendClient;
2use alloy::consensus::TxEnvelope;
3use alloy::primitives::{Address, BlockHash, BlockNumber, Bytes, ChainId, B256, U256, U64};
4use alloy::providers::{Provider, ProviderBuilder, RootProvider};
5use alloy::pubsub::{PubSubFrontend, Subscription};
6use alloy::rlp::Encodable;
7use alloy::rpc::json_rpc::{RpcParam, RpcReturn};
8use alloy::rpc::types::eth::{
9    Block, BlockNumberOrTag, FeeHistory, Filter, Header, Log, SyncStatus, Transaction,
10    TransactionReceipt, TransactionRequest,
11};
12use alloy::transports::http::{Client, Http};
13use alloy::transports::ws::WsConnect;
14use alloy::transports::{TransportError, TransportResult};
15use eigen_logging::get_test_logger;
16use eigen_metrics_collectors_rpc_calls::RpcCallsMetrics as RpcCallsCollector;
17use hex;
18use std::time::Instant;
19use thiserror::Error;
20use url::Url;
21
22const PENDING_TAG: &str = "pending";
23
24/// This struct represents an instrumented client that can be used to interact with an Ethereum node.
25/// It provides a set of methods to interact with the node and measures the duration of the calls.
26pub struct InstrumentedClient {
27    http_client: Option<RootProvider<Http<Client>>>,
28    ws_client: Option<RootProvider<PubSubFrontend>>,
29    rpc_collector: RpcCallsCollector,
30    net_version: u64,
31}
32
33/// Possible errors raised in signer creation
34#[derive(Error, Debug)]
35pub enum InstrumentedClientError {
36    #[error("invalid url")]
37    InvalidUrl,
38    #[error("error getting version")]
39    ErrorGettingVersion,
40    #[error("error running command")]
41    CommandError,
42}
43
44#[async_trait::async_trait]
45impl BackendClient for InstrumentedClient {
46    type Error = InstrumentedClientError;
47
48    /// Returns the latest block number.
49    ///
50    /// # Returns
51    ///
52    /// The latest block number.
53    ///
54    /// # Errors
55    ///
56    /// Returns an error if the RPC call fails.
57    async fn block_number(&self) -> Result<BlockNumber, Self::Error> {
58        self.instrument_function("eth_blockNumber", ())
59            .await
60            .inspect_err(|err| {
61                self.rpc_collector
62                    .logger()
63                    .error("Failed to get block number", err.to_string().as_str())
64            })
65            .map_err(|_err| InstrumentedClientError::CommandError)
66            .map(|result: U64| result.to())
67    }
68
69    /// Returns the block having the given block number.
70    ///
71    /// # Arguments
72    ///
73    /// * `number` - The block number.
74    ///
75    /// # Returns
76    ///
77    /// The block having the given block number.
78    ///
79    /// # Errors
80    ///
81    /// Returns an error if the RPC call fails.
82    async fn block_by_number(
83        &self,
84        number: BlockNumberOrTag,
85    ) -> Result<Option<Block>, Self::Error> {
86        self.instrument_function("eth_getBlockByNumber", (number, true))
87            .await
88            .inspect_err(|err| {
89                self.rpc_collector
90                    .logger()
91                    .error("Failed to get block by number", err.to_string().as_str())
92            })
93            .map_err(|_err| InstrumentedClientError::CommandError)
94    }
95}
96
97impl InstrumentedClient {
98    /// Creates a new instance of the InstrumentedClient.
99    ///
100    /// # Arguments
101    ///
102    /// * `url` - The URL of the RPC server.
103    ///
104    /// # Returns
105    ///
106    /// A new instance of the InstrumentedClient.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the URL is invalid or if there is an error getting the version.
111    pub async fn new(url: &str) -> Result<Self, InstrumentedClientError> {
112        let url = Url::parse(url).map_err(|_| InstrumentedClientError::InvalidUrl)?;
113        let http_client = ProviderBuilder::new().on_http(url);
114        let net_version = http_client
115            .get_net_version()
116            .await
117            .map_err(|_| InstrumentedClientError::ErrorGettingVersion)?;
118
119        let rpc_collector = RpcCallsCollector::new(get_test_logger().clone());
120        Ok(InstrumentedClient {
121            http_client: Some(http_client),
122            ws_client: None,
123            rpc_collector,
124            net_version,
125        })
126    }
127
128    /// Creates a new instance of the InstrumentedClient that supports ws connection.
129    ///
130    /// # Arguments
131    ///
132    /// * `url` - The ws URL of the RPC server .
133    ///
134    /// # Returns
135    ///
136    /// A new instance of the InstrumentedClient.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the URL is invalid or if there is an error getting the version.
141    pub async fn new_ws(url: &str) -> Result<Self, InstrumentedClientError> {
142        let url = Url::parse(url).map_err(|_| InstrumentedClientError::InvalidUrl)?;
143        let ws_connect = WsConnect::new(url);
144
145        let ws_client = ProviderBuilder::new().on_ws(ws_connect).await.unwrap();
146        let net_version = ws_client
147            .get_net_version()
148            .await
149            .map_err(|_| InstrumentedClientError::ErrorGettingVersion)?;
150
151        let rpc_collector = RpcCallsCollector::new(get_test_logger().clone());
152        Ok(InstrumentedClient {
153            http_client: None,
154            ws_client: Some(ws_client),
155            rpc_collector,
156            net_version,
157        })
158    }
159
160    /// Creates a new instance of the InstrumentedClient from an existing client (`RootProvider`).
161    ///
162    /// # Arguments
163    ///
164    /// * `client` - The existing client (`RootProvider`).
165    ///
166    /// # Returns
167    ///
168    /// A new instance of the InstrumentedClient.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if there is an error getting the version.
173    pub async fn new_from_client(
174        client: RootProvider<Http<Client>>,
175    ) -> Result<Self, InstrumentedClientError> {
176        let net_version = client
177            .get_net_version()
178            .await
179            .map_err(|_| InstrumentedClientError::ErrorGettingVersion)?;
180
181        let rpc_collector = RpcCallsCollector::new(get_test_logger().clone());
182        Ok(InstrumentedClient {
183            http_client: Some(client),
184            ws_client: None,
185            rpc_collector,
186            net_version,
187        })
188    }
189
190    /// Returns the chain ID.
191    ///
192    /// # Returns
193    ///
194    /// The chain ID.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if the RPC call fails.
199    pub async fn chain_id(&self) -> TransportResult<ChainId> {
200        self.instrument_function("eth_chainId", ())
201            .await
202            .inspect_err(|err| {
203                self.rpc_collector
204                    .logger()
205                    .error("Failed to get chain id", err.to_string().as_str())
206            })
207            .map(|result: U64| result.to())
208    }
209
210    /// Returns the balance of the account at the given block number.
211    ///
212    /// # Arguments
213    ///
214    /// * `account` - The account address.
215    /// * `block_number` - The block number.
216    ///
217    /// # Returns
218    ///
219    /// The balance of the account at the given block number.
220    pub async fn balance_at(
221        &self,
222        account: Address,
223        block_number: BlockNumberOrTag,
224    ) -> TransportResult<U256> {
225        self.instrument_function("eth_getBalance", (account, block_number))
226            .await
227            .inspect_err(|err| {
228                self.rpc_collector
229                    .logger()
230                    .error("Failed to get balance", err.to_string().as_str())
231            })
232    }
233
234    /// Returns the block having the given block hash.
235    ///
236    /// # Arguments
237    ///
238    /// * `hash` - The block hash.
239    ///
240    /// # Returns
241    ///
242    /// The block having the given block hash.
243    pub async fn block_by_hash(&self, hash: BlockHash) -> TransportResult<Option<Block>> {
244        self.instrument_function("eth_getBlockByHash", (hash, true))
245            .await
246            .inspect_err(|err| {
247                self.rpc_collector
248                    .logger()
249                    .error("Failed to get block by hash", err.to_string().as_str())
250            })
251    }
252
253    /// Executes a message call transaction.
254    ///
255    /// # Arguments
256    ///
257    /// * `call` - The message call to be executed
258    /// * `block_number` - The block height at which the call runs. *Note:* in case this argument is n
259    ///
260    /// # Returns
261    ///
262    /// The returned value of the executed contract.
263    pub async fn call_contract(
264        &self,
265        call: TransactionRequest,
266        block_number: BlockNumberOrTag,
267    ) -> TransportResult<Bytes> {
268        self.instrument_function("eth_call", (call, block_number))
269            .await
270            .inspect_err(|err| {
271                self.rpc_collector
272                    .logger()
273                    .error("Failed to call contract", err.to_string().as_str())
274            })
275    }
276
277    /// Returns the compiled bytecode of a smart contract given its address and block number.
278    ///
279    /// # Arguments
280    ///
281    /// * `address` - The address of the smart contract.
282    /// * `block_number` - The block number.
283    ///
284    /// # Returns
285    ///
286    /// The compiled bytecode of the smart contract with the given address and block number.
287    pub async fn code_at(
288        &self,
289        address: Address,
290        block_number: BlockNumberOrTag,
291    ) -> TransportResult<Bytes> {
292        self.instrument_function("eth_getCode", (address, block_number))
293            .await
294            .inspect_err(|err| {
295                self.rpc_collector
296                    .logger()
297                    .error("Failed to get code", err.to_string().as_str())
298            })
299    }
300
301    /// Estimates the gas needed to execute a specific transaction.
302    ///
303    /// # Arguments
304    ///
305    /// * `tx` - The transaction from which the gas consumption is estimated.
306    ///
307    /// # Returns
308    ///
309    /// The estimated gas.
310    pub async fn estimate_gas(&self, tx: TransactionRequest) -> TransportResult<u64> {
311        self.instrument_function("eth_estimateGas", (tx,))
312            .await
313            .inspect_err(|err| {
314                self.rpc_collector
315                    .logger()
316                    .error("Failed to estimate gas", err.to_string().as_str())
317            })
318            .map(|result: U64| result.to())
319    }
320
321    /// Returns a collection of historical gas information.
322    ///
323    /// # Arguments
324    ///
325    /// * `block_count` - The number of blocks to include in the collection.
326    /// * `last_block` - The last block number to include in the collection.
327    /// * `reward_percentiles` - A sorted list of percentage points used to
328    ///   sample the effective priority fees per gas from each block. The samples are
329    ///   taken in ascending order and weighted by gas usage. The list is sorted increasingly.
330    pub async fn fee_history(
331        &self,
332        block_count: u64,
333        last_block: BlockNumberOrTag,
334        reward_percentiles: &[f64],
335    ) -> TransportResult<FeeHistory> {
336        self.instrument_function(
337            "eth_feeHistory",
338            (block_count, last_block, reward_percentiles),
339        )
340        .await
341        .inspect_err(|err| {
342            self.rpc_collector
343                .logger()
344                .error("Failed to get fee history", err.to_string().as_str())
345        })
346    }
347    /// Executes a filter query.
348    ///
349    /// # Arguments
350    ///
351    /// * `filter` - The filter query to be executed.
352    ///
353    /// # Returns
354    ///
355    /// A vector of logs.
356    pub async fn filter_logs(&self, filter: Filter) -> TransportResult<Vec<Log>> {
357        self.instrument_function("eth_getLogs", (filter,))
358            .await
359            .inspect_err(|err| {
360                self.rpc_collector
361                    .logger()
362                    .error("Failed to get filter logs", err.to_string().as_str())
363            })
364    }
365
366    /// Returns the block header with the given hash.
367    ///
368    /// # Arguments
369    ///
370    /// * `hash` - The block hash.
371    ///
372    /// # Returns
373    ///
374    /// The block header.
375    pub async fn header_by_hash(&self, hash: B256) -> TransportResult<Header> {
376        let transaction_detail = false;
377        self.instrument_function("eth_getBlockByHash", (hash, transaction_detail))
378            .await
379            .inspect_err(|err| {
380                self.rpc_collector
381                    .logger()
382                    .error("Failed to get header by hash", err.to_string().as_str())
383            })
384    }
385
386    /// Returns a block header with the given block number.
387    ///
388    /// # Arguments
389    ///
390    /// * `block_number` - The block number.
391    ///
392    /// # Returns
393    ///
394    /// The block header.
395    pub async fn header_by_number(
396        &self,
397        block_number: BlockNumberOrTag,
398    ) -> TransportResult<Header> {
399        let transaction_detail = false;
400        self.instrument_function("eth_getBlockByNumber", (block_number, transaction_detail))
401            .await
402            .inspect_err(|err| {
403                self.rpc_collector
404                    .logger()
405                    .error("Failed to get header by number", err.to_string().as_str())
406            })
407    }
408
409    /// Returns the nonce of the given account.
410    ///
411    /// # Arguments
412    ///
413    /// * `account` - The address of the account.
414    /// * `block_number` - The block number from where the nonce is taken.
415    ///
416    /// # Returns
417    ///
418    /// The nonce of the account.
419    pub async fn nonce_at(
420        &self,
421        account: Address,
422        block_number: BlockNumberOrTag,
423    ) -> TransportResult<u64> {
424        self.instrument_function("eth_getTransactionCount", (account, block_number))
425            .await
426            .inspect_err(|err| {
427                self.rpc_collector
428                    .logger()
429                    .error("Failed to get nonce", err.to_string().as_str())
430            })
431            .map(|result: U64| result.to())
432    }
433
434    /// Returns the wei balance of the given account in the pending state.
435    ///
436    /// # Arguments
437    ///
438    /// * `account` - The address of the account.
439    ///
440    /// # Returns
441    ///
442    /// The wei balance of the account.
443    pub async fn pending_balance_at(&self, account: Address) -> TransportResult<U256> {
444        self.instrument_function("eth_getBalance", (account, PENDING_TAG))
445            .await
446            .inspect_err(|err| {
447                self.rpc_collector
448                    .logger()
449                    .error("Failed to get pending balance", err.to_string().as_str())
450            })
451    }
452
453    /// Executes a message call transaction using the EVM.
454    /// The state seen by the contract call is the pending state.
455    ///
456    /// # Arguments
457    ///
458    /// * `call` - The message call to be executed
459    ///
460    /// # Returns
461    ///
462    /// The returned value of the executed contract.
463    pub async fn pending_call_contract(&self, call: TransactionRequest) -> TransportResult<Bytes> {
464        self.call_contract(call, BlockNumberOrTag::Pending).await
465    }
466
467    /// Returns the contract code of the given account in the pending state.
468    ///
469    /// # Arguments
470    ///
471    /// * `account` - The address of the contract.
472    ///
473    /// # Returns
474    ///
475    /// The contract code.
476    pub async fn pending_code_at(&self, account: Address) -> TransportResult<Bytes> {
477        self.instrument_function("eth_getCode", (account, PENDING_TAG))
478            .await
479            .inspect_err(|err| {
480                self.rpc_collector
481                    .logger()
482                    .error("Failed to get pending code", err.to_string().as_str())
483            })
484    }
485
486    /// Returns the account nonce of the given account in the pending state.
487    /// This is the nonce that should be used for the next transaction.
488    ///
489    /// # Arguments
490    ///
491    /// * `account` - The address of the account.
492    ///
493    /// # Returns
494    ///
495    /// * The nonce of the account in the pending state.
496    pub async fn pending_nonce_at(&self, account: Address) -> TransportResult<u64> {
497        self.instrument_function("eth_getTransactionCount", (account, PENDING_TAG))
498            .await
499            .inspect_err(|err| {
500                self.rpc_collector
501                    .logger()
502                    .error("Failed to get pending nonce", err.to_string().as_str())
503            })
504            .map(|result: U64| result.to())
505    }
506
507    /// Returns the value of key in the contract storage of the given account in the pending state.
508    ///
509    /// # Arguments
510    ///
511    /// * `account` - The address of the contract.
512    /// * `key` - The position in the storage.
513    ///
514    /// # Returns
515    ///
516    /// The value of the storage position at the provided address.
517    pub async fn pending_storage_at(&self, account: Address, key: U256) -> TransportResult<U256> {
518        self.instrument_function("eth_getStorageAt", (account, key, PENDING_TAG))
519            .await
520            .inspect_err(|err| {
521                self.rpc_collector
522                    .logger()
523                    .error("Failed to get pending storage", err.to_string().as_str())
524            })
525    }
526
527    /// Returns the total number of transactions in the pending state.
528    ///
529    /// # Returns
530    ///
531    /// The number of pending transactions.
532    pub async fn pending_transaction_count(&self) -> TransportResult<u64> {
533        self.instrument_function("eth_getBlockTransactionCountByNumber", (PENDING_TAG,))
534            .await
535            .inspect_err(|err| {
536                self.rpc_collector
537                    .logger()
538                    .error("Failed to get transaction count", err.to_string().as_str())
539            })
540            .map(|result: U64| result.to())
541    }
542
543    /// Sends a signed transaction into the pending pool for execution.
544    ///
545    /// # Arguments
546    ///
547    /// * `tx` - The transaction to be executed.
548    ///
549    /// # Returns
550    ///
551    /// The hash of the given transaction.
552    pub async fn send_transaction(&self, tx: TxEnvelope) -> TransportResult<B256> {
553        let mut encoded_tx = Vec::new();
554        tx.encode(&mut encoded_tx);
555        self.instrument_function("eth_sendRawTransaction", (hex::encode(encoded_tx),))
556            .await
557            .inspect_err(|err| {
558                self.rpc_collector
559                    .logger()
560                    .error("Failed to send transaction", err.to_string().as_str())
561            })
562    }
563
564    /// Returns the value of key in the contract storage of the given account.
565    ///
566    /// # Arguments
567    ///
568    /// * `account` - The address of the contract.
569    /// * `key` - The position in the storage.
570    /// * `block_number` - The block number from which the storage is taken.
571    ///
572    /// # Returns
573    ///
574    /// The value of the storage position at the provided address.
575    pub async fn storage_at(
576        &self,
577        account: Address,
578        key: U256,
579        block_number: U256,
580    ) -> TransportResult<U256> {
581        self.instrument_function("eth_getStorageAt", (account, key, block_number))
582            .await
583            .inspect_err(|err| {
584                self.rpc_collector
585                    .logger()
586                    .error("Failed to get storage", err.to_string().as_str())
587            })
588    }
589
590    /// Subscribes to the results of a streaming filter query.
591    /// *Note:* this method fails if the InstrumentedClient does not use a web socket client.
592    /// # Arguments
593    ///
594    /// * `filter` - A filter query.
595    ///
596    /// # Returns
597    ///
598    /// The subscription.
599    ///
600    /// # Errors
601    ///
602    /// * If ws_client is `None`.
603    pub async fn subscribe_filter_logs<R: RpcReturn>(
604        &self,
605        filter: Filter,
606    ) -> TransportResult<Subscription<R>> {
607        let id: U256 = self
608            .instrument_function("eth_subscribe", ("logs", filter))
609            .await
610            .inspect_err(|err| {
611                self.rpc_collector.logger().error(
612                    "Failed to get logs subscription id",
613                    err.to_string().as_str(),
614                )
615            })?;
616        if let Some(ws_client) = self.ws_client.as_ref() {
617            ws_client.get_subscription(id.into()).await
618        } else {
619            Err(TransportError::UnsupportedFeature(
620                "http client does not support eth_subscribe calls.",
621            ))
622        }
623    }
624
625    /// Subscribes to notifications about the current blockchain head.
626    /// *Note:* this method fails if the InstrumentedClient does not use a web socket client.
627    ///
628    /// # Returns
629    ///
630    /// The subscription.
631    ///
632    /// # Errors
633    ///
634    /// * If ws_client is `None`.
635    pub async fn subscribe_new_head<R: RpcReturn>(&self) -> TransportResult<Subscription<R>> {
636        let id: U256 = self
637            .instrument_function("eth_subscribe", ("newHeads",))
638            .await
639            .inspect_err(|err| {
640                self.rpc_collector
641                    .logger()
642                    .error("Failed to subscribe new head", err.to_string().as_str())
643            })?;
644        if let Some(ws_client) = self.ws_client.as_ref() {
645            ws_client.get_subscription(id.into()).await
646        } else {
647            Err(TransportError::UnsupportedFeature(
648                "http client does not support eth_subscribe calls.",
649            ))
650        }
651    }
652
653    /// Retrieves the currently suggested gas price.
654    ///
655    /// # Returns
656    ///
657    /// The currently suggested gas price.
658    pub async fn suggest_gas_price(&self) -> TransportResult<u64> {
659        self.instrument_function("eth_gasPrice", ())
660            .await
661            .inspect_err(|err| {
662                self.rpc_collector
663                    .logger()
664                    .error("Failed to suggest gas price", err.to_string().as_str())
665            })
666            .map(|result: U64| result.to())
667    }
668
669    /// Retrieves the currently suggested gas tip cap after EIP1559.
670    ///
671    /// # Returns
672    ///
673    /// The currently suggested gas price.
674    pub async fn suggest_gas_tip_cap(&self) -> TransportResult<u64> {
675        self.instrument_function("eth_maxPriorityFeePerGas", ())
676            .await
677            .inspect_err(|err| {
678                self.rpc_collector
679                    .logger()
680                    .error("Failed to suggest gas tip cap", err.to_string().as_str())
681            })
682            .map(|result: U64| result.to())
683    }
684
685    /// Retrieves the current progress of the sync algorithm.
686    /// If there's no sync currently running, it returns None.
687    ///
688    /// # Returns
689    ///
690    /// The current progress of the sync algorithm.
691    pub async fn sync_progress(&self) -> TransportResult<SyncStatus> {
692        self.instrument_function("eth_syncing", ())
693            .await
694            .inspect_err(|err| {
695                self.rpc_collector
696                    .logger()
697                    .error("Failed to get sync progress", err.to_string().as_str())
698            })
699    }
700
701    /// Returns the transaction with the given hash.
702    ///
703    /// # Arguments
704    ///
705    /// * `tx_hash` - The transaction hash.
706    ///
707    /// # Returns
708    ///
709    /// The transaction with the given hash.
710    pub async fn transaction_by_hash(&self, tx_hash: B256) -> TransportResult<Transaction> {
711        self.instrument_function("eth_getTransactionByHash", (tx_hash,))
712            .await
713            .inspect_err(|err| {
714                self.rpc_collector.logger().error(
715                    "Failed to get transaction by hash",
716                    err.to_string().as_str(),
717                )
718            })
719    }
720
721    /// Returns the total number of transactions in the given block.
722    ///
723    /// # Arguments
724    ///
725    /// * `block_hash` - The block hash.
726    ///
727    /// # Returns
728    ///
729    /// The number of transactions in the given block.
730    pub async fn transaction_count(&self, block_hash: B256) -> TransportResult<u64> {
731        self.instrument_function("eth_getBlockTransactionCountByHash", (block_hash,))
732            .await
733            .inspect_err(|err| {
734                self.rpc_collector
735                    .logger()
736                    .error("Failed to get transaction count", err.to_string().as_str())
737            })
738            .map(|result: U64| result.to())
739    }
740
741    /// Returns a single transaction at index in the given block.
742    ///
743    /// # Arguments
744    ///
745    /// * `block_hash` - The block hash.
746    /// * `index` - The index of the transaction in the block.
747    ///
748    /// # Returns
749    ///
750    /// The transaction at index in the given block.
751    pub async fn transaction_in_block(
752        &self,
753        block_hash: B256,
754        index: u64,
755    ) -> TransportResult<Transaction> {
756        self.instrument_function("eth_getTransactionByBlockHashAndIndex", (block_hash, index))
757            .await
758            .inspect_err(|err| {
759                self.rpc_collector
760                    .logger()
761                    .error("Failed to get transaction", err.to_string().as_str())
762            })
763    }
764
765    /// Returns the receipt of a transaction by transaction hash.
766    /// *Note:* the receipt is not available for pending transactions.
767    ///
768    /// # Arguments
769    ///
770    /// * `tx_hash` - The hash of the transaction.
771    ///
772    /// # Returns
773    ///
774    /// The transaction receipt.
775    pub async fn transaction_receipt(&self, tx_hash: B256) -> TransportResult<TransactionReceipt> {
776        self.instrument_function("eth_getTransactionReceipt", (tx_hash,))
777            .await
778            .inspect_err(|err| {
779                self.rpc_collector
780                    .logger()
781                    .error("Failed to get receipt", err.to_string().as_str())
782            })
783    }
784
785    /// Instrument a function call with the given method name and parameters.
786    ///
787    /// This function will measure the duration of the call and report it to the metrics collector.
788    ///
789    /// # Arguments
790    ///
791    /// * `rpc_method_name` - The name of the RPC method being called.
792    /// * `params` - The parameters to pass to the RPC method.
793    ///
794    /// # Returns
795    ///
796    /// The result of the RPC call.
797    async fn instrument_function<'async_trait, P, R>(
798        &self,
799        rpc_method_name: &str,
800        params: P,
801    ) -> TransportResult<R>
802    where
803        P: RpcParam + 'async_trait,
804        R: RpcReturn + 'async_trait,
805    {
806        let start = Instant::now();
807        let method_string = String::from(rpc_method_name);
808
809        // send the request with the provided client (http or ws)
810        let result = match (self.http_client.as_ref(), self.ws_client.as_ref()) {
811            (Some(http_client), _) => http_client.raw_request(method_string.into(), params).await,
812            (_, Some(ws_client)) => ws_client.raw_request(method_string.into(), params).await,
813            (_, _) => unreachable!(),
814        };
815
816        let rpc_request_duration = start.elapsed();
817
818        // we only observe the duration of successful calls (even though this is not well defined in the spec)
819        self.rpc_collector.set_rpc_request_duration_seconds(
820            rpc_method_name,
821            self.net_version.to_string().as_str(),
822            rpc_request_duration.as_secs_f64(),
823        );
824        result
825    }
826}
827
828#[cfg(test)]
829mod tests {
830    use super::*;
831    use alloy::consensus::{SignableTransaction, TxLegacy};
832    use alloy::network::TxSignerSync;
833    use alloy::primitives::address;
834    use alloy::primitives::{bytes, TxKind::Call, U256};
835    use alloy::rpc::types::eth::{
836        pubsub::SubscriptionResult, BlockId, BlockNumberOrTag, BlockTransactionsKind,
837    };
838    use eigen_common::get_provider;
839    use eigen_signer::signer::Config;
840    use eigen_testing_utils::anvil::{set_account_balance, start_anvil_container};
841    use eigen_testing_utils::transaction::wait_transaction;
842    use tokio;
843
844    #[tokio::test]
845    async fn test_suggest_gas_tip_cap() {
846        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
847
848        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
849        let fee_per_gas = instrumented_client.suggest_gas_tip_cap().await.unwrap();
850        let expected_fee_per_gas = get_provider(&http_endpoint)
851            .get_max_priority_fee_per_gas()
852            .await
853            .unwrap();
854        assert_eq!(expected_fee_per_gas as u64, fee_per_gas);
855    }
856
857    #[tokio::test]
858    async fn test_gas_price() {
859        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
860        let provider = get_provider(&http_endpoint);
861
862        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
863        let gas_price = instrumented_client.suggest_gas_price().await.unwrap();
864        let expected_gas_price = provider.clone().get_gas_price().await.unwrap();
865        assert_eq!(gas_price, expected_gas_price as u64);
866    }
867
868    #[tokio::test]
869    async fn test_sync_status() {
870        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
871        let provider = get_provider(&http_endpoint);
872
873        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
874        let sync_status = instrumented_client.sync_progress().await.unwrap();
875        let expected_sync_status = provider.clone().syncing().await.unwrap();
876        assert_eq!(expected_sync_status, sync_status);
877    }
878
879    #[tokio::test]
880    async fn test_chain_id() {
881        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
882        let provider = get_provider(&http_endpoint);
883
884        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
885
886        let expected_chain_id = provider.clone().get_chain_id().await.unwrap();
887        let chain_id = instrumented_client.chain_id().await.unwrap();
888
889        assert_eq!(expected_chain_id, chain_id);
890    }
891
892    #[tokio::test]
893    async fn test_balance_at() {
894        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
895        let provider = get_provider(&http_endpoint);
896
897        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
898        let address = provider.get_accounts().await.unwrap()[0];
899
900        let expected_balance_at = provider.get_balance(address).await.unwrap();
901        let balance_at = instrumented_client
902            .balance_at(address, BlockNumberOrTag::Latest)
903            .await
904            .unwrap();
905
906        assert_eq!(expected_balance_at, balance_at);
907    }
908
909    #[tokio::test]
910    async fn test_subscribe_new_head() {
911        let (_container, _http_endpoint, ws_endpoint) = start_anvil_container().await;
912
913        let instrumented_client = InstrumentedClient::new_ws(&ws_endpoint).await.unwrap();
914        let subscription: TransportResult<Subscription<SubscriptionResult>> =
915            instrumented_client.subscribe_new_head().await;
916        assert!(subscription.is_ok())
917    }
918
919    #[tokio::test]
920    async fn test_subscribe_filter_logs() {
921        let (_container, http_endpoint, ws_endpoint) = start_anvil_container().await;
922        let provider = get_provider(&http_endpoint);
923
924        let instrumented_client = InstrumentedClient::new_ws(&ws_endpoint).await.unwrap();
925        let address = provider.clone().get_accounts().await.unwrap()[0];
926        let filter = Filter::new().address(address.to_string().parse::<Address>().unwrap());
927
928        let subscription: TransportResult<Subscription<SubscriptionResult>> =
929            instrumented_client.subscribe_filter_logs(filter).await;
930
931        assert!(subscription.is_ok());
932    }
933
934    #[tokio::test]
935    async fn test_block_by_hash() {
936        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
937        let provider = get_provider(&http_endpoint);
938
939        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
940
941        // get the hash from the last block
942        let hash = provider
943            .get_block(BlockId::latest(), BlockTransactionsKind::Hashes)
944            .await
945            .unwrap()
946            .unwrap()
947            .header
948            .hash;
949
950        let expected_block = provider
951            .get_block_by_hash(hash, BlockTransactionsKind::Full)
952            .await
953            .unwrap();
954        let block = instrumented_client.block_by_hash(hash).await.unwrap();
955
956        assert_eq!(expected_block, block);
957    }
958
959    #[tokio::test]
960    async fn test_block_by_number() {
961        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
962        let provider = get_provider(&http_endpoint);
963
964        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
965        let block_number = 1;
966
967        let expected_block = provider
968            .get_block_by_number(block_number.into(), BlockTransactionsKind::Full)
969            .await
970            .unwrap();
971        let block = instrumented_client
972            .block_by_number(block_number.into())
973            .await
974            .unwrap();
975
976        assert_eq!(expected_block, block);
977    }
978
979    #[tokio::test]
980    async fn test_transaction_count() {
981        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
982        let provider = get_provider(&http_endpoint);
983
984        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
985
986        let block = provider
987            .get_block(BlockId::latest(), BlockTransactionsKind::Hashes)
988            .await
989            .unwrap()
990            .unwrap();
991
992        let block_hash = block.header.hash;
993        let tx_count = instrumented_client
994            .transaction_count(B256::from_slice(block_hash.as_slice()))
995            .await
996            .unwrap();
997        let expected_tx_count = block.transactions.len();
998
999        assert_eq!(tx_count, expected_tx_count as u64);
1000    }
1001
1002    /// This test tests the following methods
1003    /// * `send_transaction`
1004    /// * `transaction_by_hash`
1005    /// * `transaction_receipt`
1006    /// * `transaction_in_block`
1007    #[tokio::test]
1008    async fn test_transaction_methods() {
1009        let (container, rpc_url, _ws_endpoint) = start_anvil_container().await;
1010        let instrumented_client = InstrumentedClient::new(&rpc_url).await.unwrap();
1011
1012        // Set balance on a random address to use as sender
1013        let private_key_hex =
1014            "6b35c6d8110c888de06575b45181bf3f9e6c73451fa5cde812c95a6b31e66ddf".to_string();
1015        let address = "009440d62dc85c73dbf889b7ad1f4da8b231d2ef";
1016        set_account_balance(&container, address).await;
1017
1018        // build the transaction
1019        let to = address!("a0Ee7A142d267C1f36714E4a8F75612F20a79720");
1020        let mut tx = TxLegacy {
1021            to: Call(to),
1022            value: U256::from(0),
1023            gas_limit: 2_000_000,
1024            nonce: 0,
1025            gas_price: 21_000_000_000,
1026            input: bytes!(),
1027            chain_id: Some(31337),
1028        };
1029
1030        let config = Config::PrivateKey(private_key_hex);
1031        let signer = Config::signer_from_config(config).unwrap();
1032        let signature = signer.sign_transaction_sync(&mut tx).unwrap();
1033        let signed_tx = tx.into_signed(signature);
1034        let tx: TxEnvelope = TxEnvelope::from(signed_tx);
1035
1036        // test send_transaction
1037        let tx_hash = instrumented_client.send_transaction(tx).await;
1038        assert!(tx_hash.is_ok());
1039        let tx_hash = B256::from_slice(tx_hash.unwrap().as_ref());
1040
1041        // test transaction_by_hash
1042        let tx_by_hash = instrumented_client.transaction_by_hash(tx_hash).await;
1043        assert!(tx_by_hash.is_ok());
1044
1045        wait_transaction(&rpc_url, tx_hash).await.unwrap();
1046
1047        // test transaction_receipt
1048        let receipt = instrumented_client.transaction_receipt(tx_hash).await;
1049        assert!(receipt.is_ok());
1050        let receipt = receipt.unwrap();
1051
1052        // test transaction_in_block
1053        let tx_by_block = instrumented_client
1054            .transaction_in_block(
1055                receipt.block_hash.unwrap(),
1056                receipt.transaction_index.unwrap(),
1057            )
1058            .await;
1059        assert!(tx_by_block.is_ok());
1060    }
1061
1062    #[tokio::test]
1063    async fn test_estimate_gas() {
1064        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1065        let provider = get_provider(&http_endpoint);
1066
1067        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1068        let accounts = provider.get_accounts().await.unwrap();
1069        let from = accounts.first().unwrap();
1070        let to = accounts.get(1).unwrap();
1071
1072        // build the transaction
1073        let tx = TxLegacy {
1074            to: Call(*to),
1075            value: U256::from(0),
1076            gas_limit: 2_000_000,
1077            nonce: 0,
1078            gas_price: 1_000_000,
1079            input: bytes!(),
1080            chain_id: Some(31337),
1081        };
1082        let tx_request: TransactionRequest = tx.clone().into();
1083        let tx_request = tx_request.from(*from);
1084
1085        let expected_estimated_gas = provider.clone().estimate_gas(&tx_request).await.unwrap();
1086        let estimated_gas = instrumented_client.estimate_gas(tx_request).await.unwrap();
1087        assert_eq!(expected_estimated_gas, estimated_gas);
1088    }
1089
1090    #[tokio::test]
1091    async fn test_call_contract_and_pending_call_contract() {
1092        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1093        let provider = get_provider(&http_endpoint);
1094
1095        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1096
1097        let anvil = provider.clone();
1098        let accounts = anvil.get_accounts().await.unwrap();
1099        let from = accounts.first().unwrap();
1100        let to = accounts.get(1).unwrap();
1101
1102        let nonce = instrumented_client
1103            .nonce_at(*from, BlockNumberOrTag::Latest)
1104            .await
1105            .unwrap();
1106
1107        // build the transaction
1108        let tx = TxLegacy {
1109            to: Call(*to),
1110            value: U256::from(0),
1111            gas_limit: 1_000_000,
1112            nonce,
1113            gas_price: 21_000_000_000,
1114            input: bytes!(),
1115            chain_id: Some(31337),
1116        };
1117        let tx_request: TransactionRequest = tx.clone().into();
1118        let tx_request = tx_request.from(*from);
1119
1120        // test call_contract
1121        let expected_bytes = anvil.call(&tx_request).await.unwrap();
1122        let bytes = instrumented_client
1123            .call_contract(tx_request.clone(), BlockNumberOrTag::Latest)
1124            .await
1125            .unwrap();
1126        assert_eq!(expected_bytes, bytes);
1127
1128        // test pending_call_contract
1129        let bytes = instrumented_client.pending_call_contract(tx_request).await;
1130        assert!(bytes.is_ok());
1131    }
1132
1133    #[tokio::test]
1134    async fn test_filter_logs() {
1135        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1136        let provider = get_provider(&http_endpoint);
1137
1138        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1139        let address = provider.clone().get_accounts().await.unwrap()[0];
1140        let filter = Filter::new().address(address.to_string().parse::<Address>().unwrap());
1141
1142        let expected_logs = provider.clone().get_logs(&filter).await.unwrap();
1143        let logs = instrumented_client.filter_logs(filter).await.unwrap();
1144
1145        assert_eq!(expected_logs, logs);
1146    }
1147
1148    #[tokio::test]
1149    async fn test_storage_at() {
1150        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1151        let provider = get_provider(&http_endpoint);
1152
1153        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1154
1155        let account = provider.clone().get_accounts().await.unwrap()[0];
1156        let expected_storage = provider
1157            .clone()
1158            .get_storage_at(account, U256::ZERO)
1159            .await
1160            .unwrap();
1161
1162        let block_number = instrumented_client.block_number().await.unwrap();
1163        let storage = instrumented_client
1164            .storage_at(account, U256::ZERO, U256::from(block_number))
1165            .await
1166            .unwrap();
1167
1168        assert_eq!(expected_storage, storage);
1169    }
1170
1171    #[tokio::test]
1172    async fn test_block_number() {
1173        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1174        let provider = get_provider(&http_endpoint);
1175
1176        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1177
1178        let expected_block_number = provider.clone().get_block_number().await.unwrap();
1179        let block_number = instrumented_client.block_number().await.unwrap();
1180
1181        assert_eq!(expected_block_number, block_number);
1182    }
1183
1184    #[tokio::test]
1185    async fn test_code_at() {
1186        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1187        let provider = get_provider(&http_endpoint);
1188
1189        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1190        let address = provider.get_accounts().await.unwrap()[0];
1191
1192        let expected_code = provider.get_code_at(address).await.unwrap();
1193        let code = instrumented_client
1194            .code_at(address, BlockNumberOrTag::Latest)
1195            .await
1196            .unwrap();
1197
1198        assert_eq!(expected_code, code);
1199    }
1200
1201    #[tokio::test]
1202    async fn test_fee_history() {
1203        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1204        let provider = get_provider(&http_endpoint);
1205
1206        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1207        let block_count = 4;
1208        let last_block = BlockNumberOrTag::Latest;
1209        let reward_percentiles = [0.2, 0.3];
1210
1211        let expected_fee_history = provider
1212            .get_fee_history(block_count, last_block, &reward_percentiles)
1213            .await
1214            .unwrap();
1215        let fee_history = instrumented_client
1216            .fee_history(block_count, last_block, &reward_percentiles)
1217            .await
1218            .unwrap();
1219
1220        assert_eq!(expected_fee_history, fee_history);
1221    }
1222
1223    #[tokio::test]
1224    async fn test_header_by_hash() {
1225        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1226        let provider = get_provider(&http_endpoint);
1227
1228        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1229        let hash = provider
1230            .get_block(BlockId::latest(), BlockTransactionsKind::Hashes)
1231            .await
1232            .unwrap()
1233            .unwrap()
1234            .header
1235            .hash;
1236        let expected_header = provider
1237            .get_block_by_hash(hash, BlockTransactionsKind::Hashes)
1238            .await
1239            .unwrap()
1240            .unwrap()
1241            .header;
1242        let header = instrumented_client.header_by_hash(hash).await.unwrap();
1243
1244        assert_eq!(expected_header, header);
1245    }
1246
1247    #[tokio::test]
1248    async fn test_header_by_number() {
1249        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1250        let provider = get_provider(&http_endpoint);
1251
1252        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1253        let block_number = BlockNumberOrTag::Earliest;
1254
1255        let header = instrumented_client
1256            .header_by_number(block_number)
1257            .await
1258            .unwrap();
1259
1260        let expected_header = provider
1261            .get_block_by_number(block_number, BlockTransactionsKind::Hashes)
1262            .await
1263            .unwrap()
1264            .unwrap()
1265            .header;
1266
1267        assert_eq!(expected_header, header);
1268    }
1269
1270    #[tokio::test]
1271    async fn test_nonce_at() {
1272        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1273        let provider = get_provider(&http_endpoint);
1274
1275        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1276        let address = provider.get_accounts().await.unwrap()[0];
1277
1278        let expected_nonce = provider.get_transaction_count(address).await.unwrap();
1279        let nonce = instrumented_client
1280            .nonce_at(address, BlockNumberOrTag::Latest)
1281            .await
1282            .unwrap();
1283
1284        assert_eq!(expected_nonce, nonce);
1285    }
1286
1287    #[tokio::test]
1288    async fn test_pending_balance_at() {
1289        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1290        let provider = get_provider(&http_endpoint);
1291
1292        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1293        let address = provider.get_accounts().await.unwrap()[0];
1294
1295        // TODO: currently comparing "pending" balance with "latest" balance. Check for pending transactions?
1296        let expected_balance = provider.get_balance(address).await.unwrap();
1297        let balance = instrumented_client
1298            .pending_balance_at(address)
1299            .await
1300            .unwrap();
1301
1302        assert_eq!(expected_balance, balance);
1303    }
1304
1305    #[tokio::test]
1306    async fn test_pending_code_at() {
1307        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1308        let provider = get_provider(&http_endpoint);
1309
1310        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1311        let address = provider.get_accounts().await.unwrap()[0];
1312
1313        // TODO: currently comparing "pending" with "latest". Check for pending transactions?
1314        let expected_code = provider.get_code_at(address).await.unwrap();
1315        let code = instrumented_client.pending_code_at(address).await.unwrap();
1316
1317        assert_eq!(expected_code, code);
1318    }
1319
1320    #[tokio::test]
1321    async fn test_pending_nonce_at() {
1322        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1323        let provider = get_provider(&http_endpoint);
1324
1325        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1326        let address = provider.get_accounts().await.unwrap()[0];
1327
1328        // TODO: currently comparing "pending" with "latest". Check for pending transactions?
1329        let expected_pending_nonce_at = provider.get_transaction_count(address).await.unwrap();
1330        let pending_nonce_at = instrumented_client.pending_nonce_at(address).await.unwrap();
1331
1332        assert_eq!(expected_pending_nonce_at, pending_nonce_at);
1333    }
1334
1335    #[tokio::test]
1336    async fn test_pending_storage_at() {
1337        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1338        let provider = get_provider(&http_endpoint);
1339
1340        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1341        let address = provider.get_accounts().await.unwrap()[0];
1342        let key = U256::from(10);
1343
1344        // TODO: currently comparing "pending" with "latest". Check for pending transactions?
1345        // TODO: set storage and check change
1346        let expected_pending_storage_at = provider.get_storage_at(address, key).await.unwrap();
1347        let pending_storage_at = instrumented_client
1348            .pending_storage_at(address, key)
1349            .await
1350            .unwrap();
1351
1352        assert_eq!(expected_pending_storage_at, pending_storage_at);
1353    }
1354
1355    #[tokio::test]
1356    async fn test_pending_transaction_count() {
1357        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1358        let provider = get_provider(&http_endpoint);
1359
1360        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1361
1362        let expected_transaction_count: u64 = provider
1363            .get_block_by_number(BlockNumberOrTag::Pending, BlockTransactionsKind::Hashes)
1364            .await
1365            .unwrap()
1366            .unwrap()
1367            .transactions
1368            .len() as u64;
1369
1370        let transaction_count = instrumented_client.pending_transaction_count().await;
1371
1372        assert_eq!(expected_transaction_count, transaction_count.unwrap());
1373    }
1374}