eigen_client_eth/
instrumented_client.rs

1use crate::client::BackendClient;
2use alloy::consensus::TxEnvelope;
3use alloy::primitives::{Address, BlockHash, BlockNumber, Bytes, ChainId, B256, U256, U64};
4use alloy::providers::{Provider, ProviderBuilder, RootProvider};
5use alloy::pubsub::Subscription;
6use alloy::rlp::Encodable;
7use alloy::rpc::json_rpc::{RpcRecv, RpcSend};
8use alloy::rpc::types::eth::{
9    Block, BlockNumberOrTag, FeeHistory, Filter, Header, Log, SyncStatus, Transaction,
10    TransactionReceipt, TransactionRequest,
11};
12use alloy::transports::ws::WsConnect;
13use alloy::transports::{TransportError, TransportResult};
14use eigen_logging::get_test_logger;
15use eigen_metrics_collectors_rpc_calls::RpcCallsMetrics as RpcCallsCollector;
16use hex;
17use std::time::Instant;
18use thiserror::Error;
19use url::Url;
20
21const PENDING_TAG: &str = "pending";
22
23/// This struct represents an instrumented client that can be used to interact with an Ethereum node.
24/// It provides a set of methods to interact with the node and measures the duration of the calls.
25pub struct InstrumentedClient {
26    http_client: Option<RootProvider>,
27    ws_client: Option<RootProvider>,
28    rpc_collector: RpcCallsCollector,
29    net_version: u64,
30}
31
32/// Possible errors raised in signer creation
33#[derive(Error, Debug)]
34pub enum InstrumentedClientError {
35    #[error("invalid url")]
36    InvalidUrl,
37    #[error("error getting version")]
38    ErrorGettingVersion,
39    #[error("error running command")]
40    CommandError,
41}
42
43#[async_trait::async_trait]
44impl BackendClient for InstrumentedClient {
45    type Error = InstrumentedClientError;
46
47    /// Returns the latest block number.
48    ///
49    /// # Returns
50    ///
51    /// The latest block number.
52    ///
53    /// # Errors
54    ///
55    /// Returns an error if the RPC call fails.
56    async fn block_number(&self) -> Result<BlockNumber, Self::Error> {
57        self.instrument_function("eth_blockNumber", ())
58            .await
59            .inspect_err(|err| {
60                self.rpc_collector
61                    .logger()
62                    .error("Failed to get block number", err.to_string().as_str())
63            })
64            .map_err(|_err| InstrumentedClientError::CommandError)
65            .map(|result: U64| result.to())
66    }
67
68    /// Returns the block having the given block number.
69    ///
70    /// # Arguments
71    ///
72    /// * `number` - The block number.
73    ///
74    /// # Returns
75    ///
76    /// The block having the given block number.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the RPC call fails.
81    async fn block_by_number(
82        &self,
83        number: BlockNumberOrTag,
84    ) -> Result<Option<Block>, Self::Error> {
85        self.instrument_function("eth_getBlockByNumber", (number, true))
86            .await
87            .inspect_err(|err| {
88                self.rpc_collector
89                    .logger()
90                    .error("Failed to get block by number", err.to_string().as_str())
91            })
92            .map_err(|_err| InstrumentedClientError::CommandError)
93    }
94}
95
96impl InstrumentedClient {
97    /// Creates a new instance of the InstrumentedClient.
98    ///
99    /// # Arguments
100    ///
101    /// * `url` - The URL of the RPC server.
102    ///
103    /// # Returns
104    ///
105    /// A new instance of the InstrumentedClient.
106    ///
107    /// # Errors
108    ///
109    /// Returns an error if the URL is invalid or if there is an error getting the version.
110    pub async fn new(url: &str) -> Result<Self, InstrumentedClientError> {
111        let url = Url::parse(url).map_err(|_| InstrumentedClientError::InvalidUrl)?;
112        let http_client = ProviderBuilder::new()
113            .disable_recommended_fillers()
114            .on_http(url);
115        let net_version = http_client
116            .get_net_version()
117            .await
118            .map_err(|_| InstrumentedClientError::ErrorGettingVersion)?;
119
120        let rpc_collector = RpcCallsCollector::new(get_test_logger().clone());
121        Ok(InstrumentedClient {
122            http_client: Some(http_client),
123            ws_client: None,
124            rpc_collector,
125            net_version,
126        })
127    }
128
129    /// Creates a new instance of the InstrumentedClient that supports ws connection.
130    ///
131    /// # Arguments
132    ///
133    /// * `url` - The ws URL of the RPC server .
134    ///
135    /// # Returns
136    ///
137    /// A new instance of the InstrumentedClient.
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if the URL is invalid or if there is an error getting the version.
142    pub async fn new_ws(url: &str) -> Result<Self, InstrumentedClientError> {
143        let url = Url::parse(url).map_err(|_| InstrumentedClientError::InvalidUrl)?;
144        let ws_connect = WsConnect::new(url);
145
146        let ws_client = ProviderBuilder::new()
147            .disable_recommended_fillers()
148            .on_ws(ws_connect)
149            .await
150            .unwrap();
151        let net_version = ws_client
152            .get_net_version()
153            .await
154            .map_err(|_| InstrumentedClientError::ErrorGettingVersion)?;
155
156        let rpc_collector = RpcCallsCollector::new(get_test_logger().clone());
157        Ok(InstrumentedClient {
158            http_client: None,
159            ws_client: Some(ws_client),
160            rpc_collector,
161            net_version,
162        })
163    }
164
165    /// Creates a new instance of the InstrumentedClient from an existing client (`RootProvider`).
166    ///
167    /// # Arguments
168    ///
169    /// * `client` - The existing client (`RootProvider`).
170    ///
171    /// # Returns
172    ///
173    /// A new instance of the InstrumentedClient.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if there is an error getting the version.
178    pub async fn new_from_client(client: RootProvider) -> Result<Self, InstrumentedClientError> {
179        let net_version = client
180            .get_net_version()
181            .await
182            .map_err(|_| InstrumentedClientError::ErrorGettingVersion)?;
183
184        let rpc_collector = RpcCallsCollector::new(get_test_logger().clone());
185        Ok(InstrumentedClient {
186            http_client: Some(client),
187            ws_client: None,
188            rpc_collector,
189            net_version,
190        })
191    }
192
193    /// Returns the chain ID.
194    ///
195    /// # Returns
196    ///
197    /// The chain ID.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if the RPC call fails.
202    pub async fn chain_id(&self) -> TransportResult<ChainId> {
203        self.instrument_function("eth_chainId", ())
204            .await
205            .inspect_err(|err| {
206                self.rpc_collector
207                    .logger()
208                    .error("Failed to get chain id", err.to_string().as_str())
209            })
210            .map(|result: U64| result.to())
211    }
212
213    /// Returns the balance of the account at the given block number.
214    ///
215    /// # Arguments
216    ///
217    /// * `account` - The account address.
218    /// * `block_number` - The block number.
219    ///
220    /// # Returns
221    ///
222    /// The balance of the account at the given block number.
223    pub async fn balance_at(
224        &self,
225        account: Address,
226        block_number: BlockNumberOrTag,
227    ) -> TransportResult<U256> {
228        self.instrument_function("eth_getBalance", (account, block_number))
229            .await
230            .inspect_err(|err| {
231                self.rpc_collector
232                    .logger()
233                    .error("Failed to get balance", err.to_string().as_str())
234            })
235    }
236
237    /// Returns the block having the given block hash.
238    ///
239    /// # Arguments
240    ///
241    /// * `hash` - The block hash.
242    ///
243    /// # Returns
244    ///
245    /// The block having the given block hash.
246    pub async fn block_by_hash(&self, hash: BlockHash) -> TransportResult<Option<Block>> {
247        self.instrument_function("eth_getBlockByHash", (hash, true))
248            .await
249            .inspect_err(|err| {
250                self.rpc_collector
251                    .logger()
252                    .error("Failed to get block by hash", err.to_string().as_str())
253            })
254    }
255
256    /// Executes a message call transaction.
257    ///
258    /// # Arguments
259    ///
260    /// * `call` - The message call to be executed
261    /// * `block_number` - The block height at which the call runs. *Note:* in case this argument is n
262    ///
263    /// # Returns
264    ///
265    /// The returned value of the executed contract.
266    pub async fn call_contract(
267        &self,
268        call: TransactionRequest,
269        block_number: BlockNumberOrTag,
270    ) -> TransportResult<Bytes> {
271        self.instrument_function("eth_call", (call, block_number))
272            .await
273            .inspect_err(|err| {
274                self.rpc_collector
275                    .logger()
276                    .error("Failed to call contract", err.to_string().as_str())
277            })
278    }
279
280    /// Returns the compiled bytecode of a smart contract given its address and block number.
281    ///
282    /// # Arguments
283    ///
284    /// * `address` - The address of the smart contract.
285    /// * `block_number` - The block number.
286    ///
287    /// # Returns
288    ///
289    /// The compiled bytecode of the smart contract with the given address and block number.
290    pub async fn code_at(
291        &self,
292        address: Address,
293        block_number: BlockNumberOrTag,
294    ) -> TransportResult<Bytes> {
295        self.instrument_function("eth_getCode", (address, block_number))
296            .await
297            .inspect_err(|err| {
298                self.rpc_collector
299                    .logger()
300                    .error("Failed to get code", err.to_string().as_str())
301            })
302    }
303
304    /// Estimates the gas needed to execute a specific transaction.
305    ///
306    /// # Arguments
307    ///
308    /// * `tx` - The transaction from which the gas consumption is estimated.
309    ///
310    /// # Returns
311    ///
312    /// The estimated gas.
313    pub async fn estimate_gas(&self, tx: TransactionRequest) -> TransportResult<u64> {
314        self.instrument_function("eth_estimateGas", (tx,))
315            .await
316            .inspect_err(|err| {
317                self.rpc_collector
318                    .logger()
319                    .error("Failed to estimate gas", err.to_string().as_str())
320            })
321            .map(|result: U64| result.to())
322    }
323
324    /// Returns a collection of historical gas information.
325    ///
326    /// # Arguments
327    ///
328    /// * `block_count` - The number of blocks to include in the collection.
329    /// * `last_block` - The last block number to include in the collection.
330    /// * `reward_percentiles` - A sorted list of percentage points used to
331    ///   sample the effective priority fees per gas from each block. The samples are
332    ///   taken in ascending order and weighted by gas usage. The list is sorted increasingly.
333    pub async fn fee_history(
334        &self,
335        block_count: u64,
336        last_block: BlockNumberOrTag,
337        reward_percentiles: &[f64],
338    ) -> TransportResult<FeeHistory> {
339        self.instrument_function(
340            "eth_feeHistory",
341            (block_count, last_block, reward_percentiles),
342        )
343        .await
344        .inspect_err(|err| {
345            self.rpc_collector
346                .logger()
347                .error("Failed to get fee history", err.to_string().as_str())
348        })
349    }
350    /// Executes a filter query.
351    ///
352    /// # Arguments
353    ///
354    /// * `filter` - The filter query to be executed.
355    ///
356    /// # Returns
357    ///
358    /// A vector of logs.
359    pub async fn filter_logs(&self, filter: Filter) -> TransportResult<Vec<Log>> {
360        self.instrument_function("eth_getLogs", (filter,))
361            .await
362            .inspect_err(|err| {
363                self.rpc_collector
364                    .logger()
365                    .error("Failed to get filter logs", err.to_string().as_str())
366            })
367    }
368
369    /// Returns the block header with the given hash.
370    ///
371    /// # Arguments
372    ///
373    /// * `hash` - The block hash.
374    ///
375    /// # Returns
376    ///
377    /// The block header.
378    pub async fn header_by_hash(&self, hash: B256) -> TransportResult<Header> {
379        let transaction_detail = false;
380        self.instrument_function("eth_getBlockByHash", (hash, transaction_detail))
381            .await
382            .inspect_err(|err| {
383                self.rpc_collector
384                    .logger()
385                    .error("Failed to get header by hash", err.to_string().as_str())
386            })
387    }
388
389    /// Returns a block header with the given block number.
390    ///
391    /// # Arguments
392    ///
393    /// * `block_number` - The block number.
394    ///
395    /// # Returns
396    ///
397    /// The block header.
398    pub async fn header_by_number(
399        &self,
400        block_number: BlockNumberOrTag,
401    ) -> TransportResult<Header> {
402        let transaction_detail = false;
403        self.instrument_function("eth_getBlockByNumber", (block_number, transaction_detail))
404            .await
405            .inspect_err(|err| {
406                self.rpc_collector
407                    .logger()
408                    .error("Failed to get header by number", err.to_string().as_str())
409            })
410    }
411
412    /// Returns the nonce of the given account.
413    ///
414    /// # Arguments
415    ///
416    /// * `account` - The address of the account.
417    /// * `block_number` - The block number from where the nonce is taken.
418    ///
419    /// # Returns
420    ///
421    /// The nonce of the account.
422    pub async fn nonce_at(
423        &self,
424        account: Address,
425        block_number: BlockNumberOrTag,
426    ) -> TransportResult<u64> {
427        self.instrument_function("eth_getTransactionCount", (account, block_number))
428            .await
429            .inspect_err(|err| {
430                self.rpc_collector
431                    .logger()
432                    .error("Failed to get nonce", err.to_string().as_str())
433            })
434            .map(|result: U64| result.to())
435    }
436
437    /// Returns the wei balance of the given account in the pending state.
438    ///
439    /// # Arguments
440    ///
441    /// * `account` - The address of the account.
442    ///
443    /// # Returns
444    ///
445    /// The wei balance of the account.
446    pub async fn pending_balance_at(&self, account: Address) -> TransportResult<U256> {
447        self.instrument_function("eth_getBalance", (account, PENDING_TAG))
448            .await
449            .inspect_err(|err| {
450                self.rpc_collector
451                    .logger()
452                    .error("Failed to get pending balance", err.to_string().as_str())
453            })
454    }
455
456    /// Executes a message call transaction using the EVM.
457    /// The state seen by the contract call is the pending state.
458    ///
459    /// # Arguments
460    ///
461    /// * `call` - The message call to be executed
462    ///
463    /// # Returns
464    ///
465    /// The returned value of the executed contract.
466    pub async fn pending_call_contract(&self, call: TransactionRequest) -> TransportResult<Bytes> {
467        self.call_contract(call, BlockNumberOrTag::Pending).await
468    }
469
470    /// Returns the contract code of the given account in the pending state.
471    ///
472    /// # Arguments
473    ///
474    /// * `account` - The address of the contract.
475    ///
476    /// # Returns
477    ///
478    /// The contract code.
479    pub async fn pending_code_at(&self, account: Address) -> TransportResult<Bytes> {
480        self.instrument_function("eth_getCode", (account, PENDING_TAG))
481            .await
482            .inspect_err(|err| {
483                self.rpc_collector
484                    .logger()
485                    .error("Failed to get pending code", err.to_string().as_str())
486            })
487    }
488
489    /// Returns the account nonce of the given account in the pending state.
490    /// This is the nonce that should be used for the next transaction.
491    ///
492    /// # Arguments
493    ///
494    /// * `account` - The address of the account.
495    ///
496    /// # Returns
497    ///
498    /// * The nonce of the account in the pending state.
499    pub async fn pending_nonce_at(&self, account: Address) -> TransportResult<u64> {
500        self.instrument_function("eth_getTransactionCount", (account, PENDING_TAG))
501            .await
502            .inspect_err(|err| {
503                self.rpc_collector
504                    .logger()
505                    .error("Failed to get pending nonce", err.to_string().as_str())
506            })
507            .map(|result: U64| result.to())
508    }
509
510    /// Returns the value of key in the contract storage of the given account in the pending state.
511    ///
512    /// # Arguments
513    ///
514    /// * `account` - The address of the contract.
515    /// * `key` - The position in the storage.
516    ///
517    /// # Returns
518    ///
519    /// The value of the storage position at the provided address.
520    pub async fn pending_storage_at(&self, account: Address, key: U256) -> TransportResult<U256> {
521        self.instrument_function("eth_getStorageAt", (account, key, PENDING_TAG))
522            .await
523            .inspect_err(|err| {
524                self.rpc_collector
525                    .logger()
526                    .error("Failed to get pending storage", err.to_string().as_str())
527            })
528    }
529
530    /// Returns the total number of transactions in the pending state.
531    ///
532    /// # Returns
533    ///
534    /// The number of pending transactions.
535    pub async fn pending_transaction_count(&self) -> TransportResult<u64> {
536        self.instrument_function("eth_getBlockTransactionCountByNumber", (PENDING_TAG,))
537            .await
538            .inspect_err(|err| {
539                self.rpc_collector
540                    .logger()
541                    .error("Failed to get transaction count", err.to_string().as_str())
542            })
543            .map(|result: U64| result.to())
544    }
545
546    /// Sends a signed transaction into the pending pool for execution.
547    ///
548    /// # Arguments
549    ///
550    /// * `tx` - The transaction to be executed.
551    ///
552    /// # Returns
553    ///
554    /// The hash of the given transaction.
555    pub async fn send_transaction(&self, tx: TxEnvelope) -> TransportResult<B256> {
556        let mut encoded_tx = Vec::new();
557        tx.encode(&mut encoded_tx);
558        self.instrument_function("eth_sendRawTransaction", (hex::encode(encoded_tx),))
559            .await
560            .inspect_err(|err| {
561                self.rpc_collector
562                    .logger()
563                    .error("Failed to send transaction", err.to_string().as_str())
564            })
565    }
566
567    /// Returns the value of key in the contract storage of the given account.
568    ///
569    /// # Arguments
570    ///
571    /// * `account` - The address of the contract.
572    /// * `key` - The position in the storage.
573    /// * `block_number` - The block number from which the storage is taken.
574    ///
575    /// # Returns
576    ///
577    /// The value of the storage position at the provided address.
578    pub async fn storage_at(
579        &self,
580        account: Address,
581        key: U256,
582        block_number: U256,
583    ) -> TransportResult<U256> {
584        self.instrument_function("eth_getStorageAt", (account, key, block_number))
585            .await
586            .inspect_err(|err| {
587                self.rpc_collector
588                    .logger()
589                    .error("Failed to get storage", err.to_string().as_str())
590            })
591    }
592
593    /// Subscribes to the results of a streaming filter query.
594    /// *Note:* this method fails if the InstrumentedClient does not use a web socket client.
595    /// # Arguments
596    ///
597    /// * `filter` - A filter query.
598    ///
599    /// # Returns
600    ///
601    /// The subscription.
602    ///
603    /// # Errors
604    ///
605    /// * If ws_client is `None`.
606    pub async fn subscribe_filter_logs<R: RpcRecv>(
607        &self,
608        filter: Filter,
609    ) -> TransportResult<Subscription<R>> {
610        let id: U256 = self
611            .instrument_function("eth_subscribe", ("logs", filter))
612            .await
613            .inspect_err(|err| {
614                self.rpc_collector.logger().error(
615                    "Failed to get logs subscription id",
616                    err.to_string().as_str(),
617                )
618            })?;
619        if let Some(ws_client) = self.ws_client.as_ref() {
620            ws_client.get_subscription(id.into()).await
621        } else {
622            Err(TransportError::UnsupportedFeature(
623                "http client does not support eth_subscribe calls.",
624            ))
625        }
626    }
627
628    /// Subscribes to notifications about the current blockchain head.
629    /// *Note:* this method fails if the InstrumentedClient does not use a web socket client.
630    ///
631    /// # Returns
632    ///
633    /// The subscription.
634    ///
635    /// # Errors
636    ///
637    /// * If ws_client is `None`.
638    pub async fn subscribe_new_head<R: RpcRecv>(&self) -> TransportResult<Subscription<R>> {
639        let id: U256 = self
640            .instrument_function("eth_subscribe", ("newHeads",))
641            .await
642            .inspect_err(|err| {
643                self.rpc_collector
644                    .logger()
645                    .error("Failed to subscribe new head", err.to_string().as_str())
646            })?;
647        if let Some(ws_client) = self.ws_client.as_ref() {
648            ws_client.get_subscription(id.into()).await
649        } else {
650            Err(TransportError::UnsupportedFeature(
651                "http client does not support eth_subscribe calls.",
652            ))
653        }
654    }
655
656    /// Retrieves the currently suggested gas price.
657    ///
658    /// # Returns
659    ///
660    /// The currently suggested gas price.
661    pub async fn suggest_gas_price(&self) -> TransportResult<u64> {
662        self.instrument_function("eth_gasPrice", ())
663            .await
664            .inspect_err(|err| {
665                self.rpc_collector
666                    .logger()
667                    .error("Failed to suggest gas price", err.to_string().as_str())
668            })
669            .map(|result: U64| result.to())
670    }
671
672    /// Retrieves the currently suggested gas tip cap after EIP1559.
673    ///
674    /// # Returns
675    ///
676    /// The currently suggested gas price.
677    pub async fn suggest_gas_tip_cap(&self) -> TransportResult<u64> {
678        self.instrument_function("eth_maxPriorityFeePerGas", ())
679            .await
680            .inspect_err(|err| {
681                self.rpc_collector
682                    .logger()
683                    .error("Failed to suggest gas tip cap", err.to_string().as_str())
684            })
685            .map(|result: U64| result.to())
686    }
687
688    /// Retrieves the current progress of the sync algorithm.
689    /// If there's no sync currently running, it returns None.
690    ///
691    /// # Returns
692    ///
693    /// The current progress of the sync algorithm.
694    pub async fn sync_progress(&self) -> TransportResult<SyncStatus> {
695        self.instrument_function("eth_syncing", ())
696            .await
697            .inspect_err(|err| {
698                self.rpc_collector
699                    .logger()
700                    .error("Failed to get sync progress", err.to_string().as_str())
701            })
702    }
703
704    /// Returns the transaction with the given hash.
705    ///
706    /// # Arguments
707    ///
708    /// * `tx_hash` - The transaction hash.
709    ///
710    /// # Returns
711    ///
712    /// The transaction with the given hash.
713    pub async fn transaction_by_hash(&self, tx_hash: B256) -> TransportResult<Transaction> {
714        self.instrument_function("eth_getTransactionByHash", (tx_hash,))
715            .await
716            .inspect_err(|err| {
717                self.rpc_collector.logger().error(
718                    "Failed to get transaction by hash",
719                    err.to_string().as_str(),
720                )
721            })
722    }
723
724    /// Returns the total number of transactions in the given block.
725    ///
726    /// # Arguments
727    ///
728    /// * `block_hash` - The block hash.
729    ///
730    /// # Returns
731    ///
732    /// The number of transactions in the given block.
733    pub async fn transaction_count(&self, block_hash: B256) -> TransportResult<u64> {
734        self.instrument_function("eth_getBlockTransactionCountByHash", (block_hash,))
735            .await
736            .inspect_err(|err| {
737                self.rpc_collector
738                    .logger()
739                    .error("Failed to get transaction count", err.to_string().as_str())
740            })
741            .map(|result: U64| result.to())
742    }
743
744    /// Returns a single transaction at index in the given block.
745    ///
746    /// # Arguments
747    ///
748    /// * `block_hash` - The block hash.
749    /// * `index` - The index of the transaction in the block.
750    ///
751    /// # Returns
752    ///
753    /// The transaction at index in the given block.
754    pub async fn transaction_in_block(
755        &self,
756        block_hash: B256,
757        index: u64,
758    ) -> TransportResult<Transaction> {
759        self.instrument_function("eth_getTransactionByBlockHashAndIndex", (block_hash, index))
760            .await
761            .inspect_err(|err| {
762                self.rpc_collector
763                    .logger()
764                    .error("Failed to get transaction", err.to_string().as_str())
765            })
766    }
767
768    /// Returns the receipt of a transaction by transaction hash.
769    /// *Note:* the receipt is not available for pending transactions.
770    ///
771    /// # Arguments
772    ///
773    /// * `tx_hash` - The hash of the transaction.
774    ///
775    /// # Returns
776    ///
777    /// The transaction receipt.
778    pub async fn transaction_receipt(&self, tx_hash: B256) -> TransportResult<TransactionReceipt> {
779        self.instrument_function("eth_getTransactionReceipt", (tx_hash,))
780            .await
781            .inspect_err(|err| {
782                self.rpc_collector
783                    .logger()
784                    .error("Failed to get receipt", err.to_string().as_str())
785            })
786    }
787
788    /// Instrument a function call with the given method name and parameters.
789    ///
790    /// This function will measure the duration of the call and report it to the metrics collector.
791    ///
792    /// # Arguments
793    ///
794    /// * `rpc_method_name` - The name of the RPC method being called.
795    /// * `params` - The parameters to pass to the RPC method.
796    ///
797    /// # Returns
798    ///
799    /// The result of the RPC call.
800    async fn instrument_function<'async_trait, P, R>(
801        &self,
802        rpc_method_name: &str,
803        params: P,
804    ) -> TransportResult<R>
805    where
806        P: RpcSend + 'async_trait,
807        R: RpcRecv + 'async_trait,
808    {
809        let start = Instant::now();
810        let method_string = String::from(rpc_method_name);
811
812        // send the request with the provided client (http or ws)
813        let result = match (self.http_client.as_ref(), self.ws_client.as_ref()) {
814            (Some(http_client), _) => http_client.raw_request(method_string.into(), params).await,
815            (_, Some(ws_client)) => ws_client.raw_request(method_string.into(), params).await,
816            (_, _) => unreachable!(),
817        };
818
819        let rpc_request_duration = start.elapsed();
820
821        // we only observe the duration of successful calls (even though this is not well defined in the spec)
822        self.rpc_collector.set_rpc_request_duration_seconds(
823            rpc_method_name,
824            self.net_version.to_string().as_str(),
825            rpc_request_duration.as_secs_f64(),
826        );
827        result
828    }
829}
830
831#[cfg(test)]
832mod tests {
833    use super::*;
834    use alloy::consensus::{SignableTransaction, TxLegacy};
835    use alloy::network::TxSignerSync;
836    use alloy::primitives::address;
837    use alloy::primitives::{bytes, TxKind::Call, U256};
838    use alloy::rpc::types::eth::{pubsub::SubscriptionResult, BlockId, BlockNumberOrTag};
839    use eigen_common::get_provider;
840    use eigen_signer::signer::Config;
841    use eigen_testing_utils::anvil::{set_account_balance, start_anvil_container};
842    use eigen_testing_utils::transaction::wait_transaction;
843    use tokio;
844
845    #[tokio::test]
846    async fn test_suggest_gas_tip_cap() {
847        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
848
849        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
850        let fee_per_gas = instrumented_client.suggest_gas_tip_cap().await.unwrap();
851        let expected_fee_per_gas = get_provider(&http_endpoint)
852            .get_max_priority_fee_per_gas()
853            .await
854            .unwrap();
855        assert_eq!(expected_fee_per_gas as u64, fee_per_gas);
856    }
857
858    #[tokio::test]
859    async fn test_gas_price() {
860        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
861        let provider = get_provider(&http_endpoint);
862
863        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
864        let gas_price = instrumented_client.suggest_gas_price().await.unwrap();
865        let expected_gas_price = provider.clone().get_gas_price().await.unwrap();
866        assert_eq!(gas_price, expected_gas_price as u64);
867    }
868
869    #[tokio::test]
870    async fn test_sync_status() {
871        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
872        let provider = get_provider(&http_endpoint);
873
874        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
875        let sync_status = instrumented_client.sync_progress().await.unwrap();
876        let expected_sync_status = provider.clone().syncing().await.unwrap();
877        assert_eq!(expected_sync_status, sync_status);
878    }
879
880    #[tokio::test]
881    async fn test_chain_id() {
882        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
883        let provider = get_provider(&http_endpoint);
884
885        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
886
887        let expected_chain_id = provider.clone().get_chain_id().await.unwrap();
888        let chain_id = instrumented_client.chain_id().await.unwrap();
889
890        assert_eq!(expected_chain_id, chain_id);
891    }
892
893    #[tokio::test]
894    async fn test_balance_at() {
895        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
896        let provider = get_provider(&http_endpoint);
897
898        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
899        let address = provider.get_accounts().await.unwrap()[0];
900
901        let expected_balance_at = provider.get_balance(address).await.unwrap();
902        let balance_at = instrumented_client
903            .balance_at(address, BlockNumberOrTag::Latest)
904            .await
905            .unwrap();
906
907        assert_eq!(expected_balance_at, balance_at);
908    }
909
910    #[tokio::test]
911    async fn test_subscribe_new_head() {
912        let (_container, _http_endpoint, ws_endpoint) = start_anvil_container().await;
913
914        let instrumented_client = InstrumentedClient::new_ws(&ws_endpoint).await.unwrap();
915        let subscription: TransportResult<Subscription<SubscriptionResult>> =
916            instrumented_client.subscribe_new_head().await;
917        assert!(subscription.is_ok())
918    }
919
920    #[tokio::test]
921    async fn test_subscribe_filter_logs() {
922        let (_container, http_endpoint, ws_endpoint) = start_anvil_container().await;
923        let provider = get_provider(&http_endpoint);
924
925        let instrumented_client = InstrumentedClient::new_ws(&ws_endpoint).await.unwrap();
926        let address = provider.clone().get_accounts().await.unwrap()[0];
927        let filter = Filter::new().address(address.to_string().parse::<Address>().unwrap());
928
929        let subscription: TransportResult<Subscription<SubscriptionResult>> =
930            instrumented_client.subscribe_filter_logs(filter).await;
931
932        assert!(subscription.is_ok());
933    }
934
935    #[tokio::test]
936    async fn test_block_by_hash() {
937        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
938        let provider = get_provider(&http_endpoint);
939
940        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
941
942        // get the hash from the last block
943        let hash = provider
944            .get_block(BlockId::latest())
945            .await
946            .unwrap()
947            .unwrap()
948            .header
949            .hash;
950
951        let expected_block = provider.get_block_by_hash(hash).full().await.unwrap();
952        let block = instrumented_client.block_by_hash(hash).await.unwrap();
953
954        assert_eq!(expected_block, block);
955    }
956
957    #[tokio::test]
958    async fn test_block_by_number() {
959        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
960        let provider = get_provider(&http_endpoint);
961
962        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
963        let block_number = 1;
964
965        let expected_block = provider
966            .get_block_by_number(block_number.into())
967            .full()
968            .await
969            .unwrap();
970        let block = instrumented_client
971            .block_by_number(block_number.into())
972            .await
973            .unwrap();
974
975        assert_eq!(expected_block, block);
976    }
977
978    #[tokio::test]
979    async fn test_transaction_count() {
980        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
981        let provider = get_provider(&http_endpoint);
982
983        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
984
985        let block = provider
986            .get_block(BlockId::latest())
987            .await
988            .unwrap()
989            .unwrap();
990
991        let block_hash = block.header.hash;
992        let tx_count = instrumented_client
993            .transaction_count(B256::from_slice(block_hash.as_slice()))
994            .await
995            .unwrap();
996        let expected_tx_count = block.transactions.len();
997
998        assert_eq!(tx_count, expected_tx_count as u64);
999    }
1000
1001    /// This test tests the following methods
1002    /// * `send_transaction`
1003    /// * `transaction_by_hash`
1004    /// * `transaction_receipt`
1005    /// * `transaction_in_block`
1006    #[tokio::test]
1007    async fn test_transaction_methods() {
1008        let (container, rpc_url, _ws_endpoint) = start_anvil_container().await;
1009        let instrumented_client = InstrumentedClient::new(&rpc_url).await.unwrap();
1010
1011        // Set balance on a random address to use as sender
1012        let private_key_hex =
1013            "6b35c6d8110c888de06575b45181bf3f9e6c73451fa5cde812c95a6b31e66ddf".to_string();
1014        let address = "009440d62dc85c73dbf889b7ad1f4da8b231d2ef";
1015        set_account_balance(&container, address).await;
1016
1017        // build the transaction
1018        let to = address!("a0Ee7A142d267C1f36714E4a8F75612F20a79720");
1019        let mut tx = TxLegacy {
1020            to: Call(to),
1021            value: U256::from(0),
1022            gas_limit: 2_000_000,
1023            nonce: 0,
1024            gas_price: 21_000_000_000,
1025            input: bytes!(),
1026            chain_id: Some(31337),
1027        };
1028
1029        let config = Config::PrivateKey(private_key_hex);
1030        let signer = Config::signer_from_config(config).unwrap();
1031        let signature = signer.sign_transaction_sync(&mut tx).unwrap();
1032        let signed_tx = tx.into_signed(signature);
1033        let tx: TxEnvelope = TxEnvelope::from(signed_tx);
1034
1035        // test send_transaction
1036        let tx_hash = instrumented_client.send_transaction(tx).await;
1037        assert!(tx_hash.is_ok());
1038        let tx_hash = B256::from_slice(tx_hash.unwrap().as_ref());
1039
1040        // test transaction_by_hash
1041        let tx_by_hash = instrumented_client.transaction_by_hash(tx_hash).await;
1042        assert!(tx_by_hash.is_ok());
1043
1044        wait_transaction(&rpc_url, tx_hash).await.unwrap();
1045
1046        // test transaction_receipt
1047        let receipt = instrumented_client.transaction_receipt(tx_hash).await;
1048        assert!(receipt.is_ok());
1049        let receipt = receipt.unwrap();
1050
1051        // test transaction_in_block
1052        let tx_by_block = instrumented_client
1053            .transaction_in_block(
1054                receipt.block_hash.unwrap(),
1055                receipt.transaction_index.unwrap(),
1056            )
1057            .await;
1058        assert!(tx_by_block.is_ok());
1059    }
1060
1061    #[tokio::test]
1062    async fn test_estimate_gas() {
1063        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1064        let provider = get_provider(&http_endpoint);
1065
1066        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1067        let accounts = provider.get_accounts().await.unwrap();
1068        let from = accounts.first().unwrap();
1069        let to = accounts.get(1).unwrap();
1070
1071        // build the transaction
1072        let tx = TxLegacy {
1073            to: Call(*to),
1074            value: U256::from(0),
1075            gas_limit: 2_000_000,
1076            nonce: 0,
1077            gas_price: 1_000_000,
1078            input: bytes!(),
1079            chain_id: Some(31337),
1080        };
1081        let tx_request: TransactionRequest = tx.clone().into();
1082        let tx_request = tx_request.from(*from);
1083
1084        let expected_estimated_gas = provider
1085            .clone()
1086            .estimate_gas(tx_request.clone())
1087            .await
1088            .unwrap();
1089        let estimated_gas = instrumented_client.estimate_gas(tx_request).await.unwrap();
1090        assert_eq!(expected_estimated_gas, estimated_gas);
1091    }
1092
1093    #[tokio::test]
1094    async fn test_call_contract_and_pending_call_contract() {
1095        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1096        let provider = get_provider(&http_endpoint);
1097
1098        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1099
1100        let anvil = provider.clone();
1101        let accounts = anvil.get_accounts().await.unwrap();
1102        let from = accounts.first().unwrap();
1103        let to = accounts.get(1).unwrap();
1104
1105        let nonce = instrumented_client
1106            .nonce_at(*from, BlockNumberOrTag::Latest)
1107            .await
1108            .unwrap();
1109
1110        // build the transaction
1111        let tx = TxLegacy {
1112            to: Call(*to),
1113            value: U256::from(0),
1114            gas_limit: 1_000_000,
1115            nonce,
1116            gas_price: 21_000_000_000,
1117            input: bytes!(),
1118            chain_id: Some(31337),
1119        };
1120        let tx_request: TransactionRequest = tx.clone().into();
1121        let tx_request = tx_request.from(*from);
1122
1123        // test call_contract
1124        let expected_bytes = anvil.call(tx_request.clone()).await.unwrap();
1125        let bytes = instrumented_client
1126            .call_contract(tx_request.clone(), BlockNumberOrTag::Latest)
1127            .await
1128            .unwrap();
1129        assert_eq!(expected_bytes, bytes);
1130
1131        // test pending_call_contract
1132        let bytes = instrumented_client.pending_call_contract(tx_request).await;
1133        assert!(bytes.is_ok());
1134    }
1135
1136    #[tokio::test]
1137    async fn test_filter_logs() {
1138        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1139        let provider = get_provider(&http_endpoint);
1140
1141        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1142        let address = provider.clone().get_accounts().await.unwrap()[0];
1143        let filter = Filter::new().address(address.to_string().parse::<Address>().unwrap());
1144
1145        let expected_logs = provider.clone().get_logs(&filter).await.unwrap();
1146        let logs = instrumented_client.filter_logs(filter).await.unwrap();
1147
1148        assert_eq!(expected_logs, logs);
1149    }
1150
1151    #[tokio::test]
1152    async fn test_storage_at() {
1153        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1154        let provider = get_provider(&http_endpoint);
1155
1156        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1157
1158        let account = provider.clone().get_accounts().await.unwrap()[0];
1159        let expected_storage = provider
1160            .clone()
1161            .get_storage_at(account, U256::ZERO)
1162            .await
1163            .unwrap();
1164
1165        let block_number = instrumented_client.block_number().await.unwrap();
1166        let storage = instrumented_client
1167            .storage_at(account, U256::ZERO, U256::from(block_number))
1168            .await
1169            .unwrap();
1170
1171        assert_eq!(expected_storage, storage);
1172    }
1173
1174    #[tokio::test]
1175    async fn test_block_number() {
1176        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1177        let provider = get_provider(&http_endpoint);
1178
1179        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1180
1181        let expected_block_number = provider.clone().get_block_number().await.unwrap();
1182        let block_number = instrumented_client.block_number().await.unwrap();
1183
1184        assert_eq!(expected_block_number, block_number);
1185    }
1186
1187    #[tokio::test]
1188    async fn test_code_at() {
1189        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1190        let provider = get_provider(&http_endpoint);
1191
1192        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1193        let address = provider.get_accounts().await.unwrap()[0];
1194
1195        let expected_code = provider.get_code_at(address).await.unwrap();
1196        let code = instrumented_client
1197            .code_at(address, BlockNumberOrTag::Latest)
1198            .await
1199            .unwrap();
1200
1201        assert_eq!(expected_code, code);
1202    }
1203
1204    #[tokio::test]
1205    async fn test_fee_history() {
1206        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1207        let provider = get_provider(&http_endpoint);
1208
1209        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1210        let block_count = 4;
1211        let last_block = BlockNumberOrTag::Latest;
1212        let reward_percentiles = [0.2, 0.3];
1213
1214        let expected_fee_history = provider
1215            .get_fee_history(block_count, last_block, &reward_percentiles)
1216            .await
1217            .unwrap();
1218        let fee_history = instrumented_client
1219            .fee_history(block_count, last_block, &reward_percentiles)
1220            .await
1221            .unwrap();
1222
1223        assert_eq!(expected_fee_history, fee_history);
1224    }
1225
1226    #[tokio::test]
1227    async fn test_header_by_hash() {
1228        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1229        let provider = get_provider(&http_endpoint);
1230
1231        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1232        let hash = provider
1233            .get_block(BlockId::latest())
1234            .await
1235            .unwrap()
1236            .unwrap()
1237            .header
1238            .hash;
1239        let expected_header = provider
1240            .get_block_by_hash(hash)
1241            .await
1242            .unwrap()
1243            .unwrap()
1244            .header;
1245        let header = instrumented_client.header_by_hash(hash).await.unwrap();
1246
1247        assert_eq!(expected_header, header);
1248    }
1249
1250    #[tokio::test]
1251    async fn test_header_by_number() {
1252        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1253        let provider = get_provider(&http_endpoint);
1254
1255        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1256        let block_number = BlockNumberOrTag::Earliest;
1257
1258        let header = instrumented_client
1259            .header_by_number(block_number)
1260            .await
1261            .unwrap();
1262
1263        let expected_header = provider
1264            .get_block_by_number(block_number)
1265            .await
1266            .unwrap()
1267            .unwrap()
1268            .header;
1269
1270        assert_eq!(expected_header, header);
1271    }
1272
1273    #[tokio::test]
1274    async fn test_nonce_at() {
1275        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1276        let provider = get_provider(&http_endpoint);
1277
1278        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1279        let address = provider.get_accounts().await.unwrap()[0];
1280
1281        let expected_nonce = provider.get_transaction_count(address).await.unwrap();
1282        let nonce = instrumented_client
1283            .nonce_at(address, BlockNumberOrTag::Latest)
1284            .await
1285            .unwrap();
1286
1287        assert_eq!(expected_nonce, nonce);
1288    }
1289
1290    #[tokio::test]
1291    async fn test_pending_balance_at() {
1292        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1293        let provider = get_provider(&http_endpoint);
1294
1295        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1296        let address = provider.get_accounts().await.unwrap()[0];
1297
1298        // TODO: currently comparing "pending" balance with "latest" balance. Check for pending transactions?
1299        let expected_balance = provider.get_balance(address).await.unwrap();
1300        let balance = instrumented_client
1301            .pending_balance_at(address)
1302            .await
1303            .unwrap();
1304
1305        assert_eq!(expected_balance, balance);
1306    }
1307
1308    #[tokio::test]
1309    async fn test_pending_code_at() {
1310        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1311        let provider = get_provider(&http_endpoint);
1312
1313        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1314        let address = provider.get_accounts().await.unwrap()[0];
1315
1316        // TODO: currently comparing "pending" with "latest". Check for pending transactions?
1317        let expected_code = provider.get_code_at(address).await.unwrap();
1318        let code = instrumented_client.pending_code_at(address).await.unwrap();
1319
1320        assert_eq!(expected_code, code);
1321    }
1322
1323    #[tokio::test]
1324    async fn test_pending_nonce_at() {
1325        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1326        let provider = get_provider(&http_endpoint);
1327
1328        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1329        let address = provider.get_accounts().await.unwrap()[0];
1330
1331        // TODO: currently comparing "pending" with "latest". Check for pending transactions?
1332        let expected_pending_nonce_at = provider.get_transaction_count(address).await.unwrap();
1333        let pending_nonce_at = instrumented_client.pending_nonce_at(address).await.unwrap();
1334
1335        assert_eq!(expected_pending_nonce_at, pending_nonce_at);
1336    }
1337
1338    #[tokio::test]
1339    async fn test_pending_storage_at() {
1340        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1341        let provider = get_provider(&http_endpoint);
1342
1343        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1344        let address = provider.get_accounts().await.unwrap()[0];
1345        let key = U256::from(10);
1346
1347        // TODO: currently comparing "pending" with "latest". Check for pending transactions?
1348        // TODO: set storage and check change
1349        let expected_pending_storage_at = provider.get_storage_at(address, key).await.unwrap();
1350        let pending_storage_at = instrumented_client
1351            .pending_storage_at(address, key)
1352            .await
1353            .unwrap();
1354
1355        assert_eq!(expected_pending_storage_at, pending_storage_at);
1356    }
1357
1358    #[tokio::test]
1359    async fn test_pending_transaction_count() {
1360        let (_container, http_endpoint, _ws_endpoint) = start_anvil_container().await;
1361        let provider = get_provider(&http_endpoint);
1362
1363        let instrumented_client = InstrumentedClient::new(&http_endpoint).await.unwrap();
1364
1365        let expected_transaction_count: u64 = provider
1366            .get_block_by_number(BlockNumberOrTag::Pending)
1367            .await
1368            .unwrap()
1369            .unwrap()
1370            .transactions
1371            .len() as u64;
1372
1373        let transaction_count = instrumented_client.pending_transaction_count().await;
1374
1375        assert_eq!(expected_transaction_count, transaction_count.unwrap());
1376    }
1377}